package org.zjvis.datascience.service.dataset.quartz;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.quartz.*;
import org.quartz.impl.matchers.GroupMatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.stereotype.Component;
import org.zjvis.datascience.common.dto.DatasetDTO;
import org.zjvis.datascience.service.mapper.DatasetMapper;

import javax.annotation.Resource;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

/**
 * @description QuartZ 调度新任务，更改调度任务，删除已取消的任务
 * @date 2021-12-06
 */
@Configuration
@Component
@EnableScheduling
public class ScheduledTask {

    @Resource(name = "scheduler")
    private Scheduler scheduler;

    @Autowired
    DatasetMapper datasetMapper;

    private final static Logger log = LoggerFactory.getLogger(ScheduledTask.class);

    /**
     * 调度新任务，更改调度任务，删除已取消的任务
     *
     * @throws SchedulerException 调度过程出错
     */
    public void scheduleJob() throws SchedulerException {
        List<DatasetDTO> datasetDTOS = datasetMapper.getAllTableDataSourceNeedScheduled();
        Set<JobKey> jobKeySet = new HashSet<>();
        for (DatasetDTO datasetDTO : datasetDTOS) {
            String crontab = JSON.parseObject(datasetDTO.getIncrementalDataConfig()).getString("crontab");
            JobKey jobKey = new JobKey("job_" + datasetDTO.getId(), "work");
            TriggerKey triggerKey = new TriggerKey("trigger_" + datasetDTO.getId());
            if (!scheduler.checkExists(jobKey)) {
                JobDetail jobDetail = JobBuilder.newJob(DataXJob.class)
                        .withIdentity(jobKey)
                        .build();
                if (datasetDTO.getDataJson() != null) {
                    JSONObject dataJsonObject = JSON.parseObject(datasetDTO.getDataJson());
                    jobDetail.getJobDataMap().put("targetTable", dataJsonObject.getString("table"));
                }
                CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(triggerKey)
                        .withSchedule(CronScheduleBuilder.cronSchedule(crontab)).build();
                scheduler.scheduleJob(jobDetail, trigger);
            } else {
                CronTrigger cronTrigger = (CronTrigger) scheduler.getTrigger(triggerKey);
                if (!cronTrigger.getCronExpression().equals(crontab)) {
                    CronTrigger newConTrigger = TriggerBuilder.newTrigger().withIdentity(triggerKey)
                            .withSchedule(CronScheduleBuilder.cronSchedule(crontab)).build();
                    scheduler.rescheduleJob(triggerKey, newConTrigger);
                }
            }
            jobKeySet.add(jobKey);
        }
        //清理已停止的定时任务和已删除的表
        Set<JobKey> allCurrentJobs = scheduler.getJobKeys(GroupMatcher.anyGroup());
        allCurrentJobs.stream().filter(x -> !jobKeySet.contains(x)).forEach(x -> {
            try {
                if (!x.getName().equals("main-job")) { //避免删除更新cron的job
                    scheduler.deleteJob(x);
                }
            } catch (SchedulerException e) {
                log.info("quartz delete job error!", e);
            }
        });
    }
}